package cn.xshi.log.client.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
/**
 * @Desc KafkaUtils工具类
 * @Author 邓纯杰
 * @CreateTime 2012-12-12 12:12:12
 */
@Component
public class KafkaUtils {

    @Value("${plug.log.kafka.address:}")
    private String plugLogKafkaAddress;

    @Value("${plug.log.kafka.acks:}")
    private String plugLogKafkaAcks;

    @Value("${plug.log.kafka.retries:0}")
    private String plugLogKafkaRetries;//重试次数

    @Value("${plug.log.kafka.batchSize:0}")
    private String plugLogKafkaBatchSize;

    @Value("${plug.log.kafka.lingerMs:1}")
    private String plugLogKafkaLingerMs;

    @Value("${plug.log.kafka.bufferMemory:33554432}")
    private String plugLogKafkaBufferMemory;

    public String getPlugLogKafkaAddress() {
        return plugLogKafkaAddress;
    }

    public void setPlugLogKafkaAddress(String plugLogKafkaAddress) {
        this.plugLogKafkaAddress = plugLogKafkaAddress;
    }

    public String getPlugLogKafkaAcks() {
        return plugLogKafkaAcks;
    }

    public void setPlugLogKafkaAcks(String plugLogKafkaAcks) {
        this.plugLogKafkaAcks = plugLogKafkaAcks;
    }

    public String getPlugLogKafkaRetries() {
        return plugLogKafkaRetries;
    }

    public void setPlugLogKafkaRetries(String plugLogKafkaRetries) {
        this.plugLogKafkaRetries = plugLogKafkaRetries;
    }

    public String getPlugLogKafkaBatchSize() {
        return plugLogKafkaBatchSize;
    }

    public void setPlugLogKafkaBatchSize(String plugLogKafkaBatchSize) {
        this.plugLogKafkaBatchSize = plugLogKafkaBatchSize;
    }

    public String getPlugLogKafkaLingerMs() {
        return plugLogKafkaLingerMs;
    }

    public void setPlugLogKafkaLingerMs(String plugLogKafkaLingerMs) {
        this.plugLogKafkaLingerMs = plugLogKafkaLingerMs;
    }

    public String getPlugLogKafkaBufferMemory() {
        return plugLogKafkaBufferMemory;
    }

    public void setPlugLogKafkaBufferMemory(String plugLogKafkaBufferMemory) {
        this.plugLogKafkaBufferMemory = plugLogKafkaBufferMemory;
    }

    /**
     *
     * @return
     */
    private KafkaProducer<String, String> createProducer() {
        if(StringUtils.isBlank(plugLogKafkaAddress)){
            return null;
        }
        Properties props = new Properties();
        props.put("bootstrap.servers", plugLogKafkaAddress);
        props.put("acks", plugLogKafkaAcks);
        props.put("retries", plugLogKafkaRetries);
        props.put("batch.size", plugLogKafkaBatchSize);
        props.put("linger.ms", plugLogKafkaLingerMs);
        props.put("buffer.memory", plugLogKafkaBufferMemory);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>((props));
    }

    /**
     * 发送信息
     * @param topic
     * @param message
     */
    public void send(String topic, String message) {
        if(StringUtils.isBlank(topic)){
            return;
        }
        Producer<String,String> producer = createProducer();
        if(null == producer){
            return;
        }
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
        try {
            producer.send(producerRecord);
        }catch (Exception e){
//            System.out.println("");
        }finally {
            if(null != producer){
                producer.close();
            }
        }
    }

    /**
     * 同步发送并返回
     * @param topic
     * @param message
     */
    public RecordMetadata syncSend(String topic, String message) {
        Producer<String,String> producer = createProducer();
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
        RecordMetadata recordMetadata = null;
        try {
            recordMetadata = producer.send(producerRecord).get();
        }catch (Exception e){
//            System.out.println("");
        }finally {
            if(null != producer){
                producer.close();
            }
        }
        return recordMetadata;
    }
}
